// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

package storage

import (
	"errors"
	"fmt"
	"os"
	"syscall"

	"github.com/golang/protobuf/proto"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/schemapb"
	"github.com/milvus-io/milvus/internal/util/tsoutil"
)

func PrintBinlogFiles(fileList []string) error {
	for _, file := range fileList {
		if err := printBinlogFile(file); err != nil {
			return err
		}
	}
	return nil
}

func printBinlogFile(filename string) error {
	fd, err := os.OpenFile(filename, os.O_RDONLY, 0400)
	if err != nil {
		return err
	}
	defer fd.Close()

	fileInfo, err := fd.Stat()
	if err != nil {
		return err
	}

	fmt.Printf("file size = %d\n", fileInfo.Size())

	b, err := syscall.Mmap(int(fd.Fd()), 0, int(fileInfo.Size()), syscall.PROT_READ, syscall.MAP_SHARED)
	if err != nil {
		return nil
	}
	defer syscall.Munmap(b)

	fmt.Printf("buf size = %d\n", len(b))

	r, err := NewBinlogReader(b)
	if err != nil {
		return err
	}
	defer r.Close()

	fmt.Println("descriptor event header:")
	physical, _ := tsoutil.ParseTS(r.descriptorEvent.descriptorEventHeader.Timestamp)
	fmt.Printf("\tTimestamp: %v\n", physical)
	fmt.Printf("\tTypeCode: %s\n", r.descriptorEvent.descriptorEventHeader.TypeCode.String())
	fmt.Printf("\tServerID: %d\n", r.descriptorEvent.descriptorEventHeader.ServerID)
	fmt.Printf("\tEventLength: %d\n", r.descriptorEvent.descriptorEventHeader.EventLength)
	fmt.Printf("\tNextPosition :%d\n", r.descriptorEvent.descriptorEventHeader.NextPosition)
	fmt.Println("descriptor event data:")
	fmt.Printf("\tBinlogVersion: %d\n", r.descriptorEvent.descriptorEventData.BinlogVersion)
	fmt.Printf("\tServerVersion: %d\n", r.descriptorEvent.descriptorEventData.ServerVersion)
	fmt.Printf("\tCommitID: %d\n", r.descriptorEvent.descriptorEventData.CommitID)
	fmt.Printf("\tHeaderLength: %d\n", r.descriptorEvent.descriptorEventData.HeaderLength)
	fmt.Printf("\tCollectionID: %d\n", r.descriptorEvent.descriptorEventData.CollectionID)
	fmt.Printf("\tPartitionID: %d\n", r.descriptorEvent.descriptorEventData.PartitionID)
	fmt.Printf("\tSegmentID: %d\n", r.descriptorEvent.descriptorEventData.SegmentID)
	fmt.Printf("\tFieldID: %d\n", r.descriptorEvent.descriptorEventData.FieldID)
	physical, _ = tsoutil.ParseTS(r.descriptorEvent.descriptorEventData.StartTimestamp)
	fmt.Printf("\tStartTimestamp: %v\n", physical)
	physical, _ = tsoutil.ParseTS(r.descriptorEvent.descriptorEventData.EndTimestamp)
	fmt.Printf("\tEndTimestamp: %v\n", physical)
	dataTypeName, ok := schemapb.DataType_name[int32(r.descriptorEvent.descriptorEventData.PayloadDataType)]
	if !ok {
		return fmt.Errorf("undefine data type %d", r.descriptorEvent.descriptorEventData.PayloadDataType)
	}
	fmt.Printf("\tPayloadDataType: %v\n", dataTypeName)
	fmt.Printf("\tPostHeaderLengths: %v\n", r.descriptorEvent.descriptorEventData.PostHeaderLengths)
	eventNum := 0
	for {
		event, err := r.NextEventReader()
		if err != nil {
			return err
		}
		if event == nil {
			break
		}
		fmt.Printf("event %d header:\n", eventNum)
		physical, _ = tsoutil.ParseTS(event.eventHeader.Timestamp)
		fmt.Printf("\tTimestamp: %v\n", physical)
		fmt.Printf("\tTypeCode: %s\n", event.eventHeader.TypeCode.String())
		fmt.Printf("\tServerID: %d\n", event.eventHeader.ServerID)
		fmt.Printf("\tEventLength: %d\n", event.eventHeader.EventLength)
		fmt.Printf("\tNextPosition: %d\n", event.eventHeader.NextPosition)
		switch event.eventHeader.TypeCode {
		case InsertEventType:
			evd, ok := event.eventData.(*insertEventData)
			if !ok {
				return errors.New("incorrect event data type")
			}
			fmt.Printf("event %d insert event:\n", eventNum)
			physical, _ = tsoutil.ParseTS(evd.StartTimestamp)
			fmt.Printf("\tStartTimestamp: %v\n", physical)
			physical, _ = tsoutil.ParseTS(evd.EndTimestamp)
			fmt.Printf("\tEndTimestamp: %v\n", physical)
			if err := printPayloadValues(r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil {
				return err
			}
		case DeleteEventType:
			evd, ok := event.eventData.(*deleteEventData)
			if !ok {
				return errors.New("incorrect event data type")
			}
			fmt.Printf("event %d delete event:\n", eventNum)
			physical, _ = tsoutil.ParseTS(evd.StartTimestamp)
			fmt.Printf("\tStartTimestamp: %v\n", physical)
			physical, _ = tsoutil.ParseTS(evd.EndTimestamp)
			fmt.Printf("\tEndTimestamp: %v\n", physical)
			if err := printPayloadValues(r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil {
				return err
			}
		case CreateCollectionEventType:
			evd, ok := event.eventData.(*createCollectionEventData)
			if !ok {
				return errors.New("incorrect event data type")
			}
			fmt.Printf("event %d create collection event:\n", eventNum)
			physical, _ = tsoutil.ParseTS(evd.StartTimestamp)
			fmt.Printf("\tStartTimestamp: %v\n", physical)
			physical, _ = tsoutil.ParseTS(evd.EndTimestamp)
			fmt.Printf("\tEndTimestamp: %v\n", physical)
			if err := printDDLPayloadValues(event.eventHeader.TypeCode, r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil {
				return err
			}
		case DropCollectionEventType:
			evd, ok := event.eventData.(*dropCollectionEventData)
			if !ok {
				return errors.New("incorrect event data type")
			}
			fmt.Printf("event %d drop collection event:\n", eventNum)
			physical, _ = tsoutil.ParseTS(evd.StartTimestamp)
			fmt.Printf("\tStartTimestamp: %v\n", physical)
			physical, _ = tsoutil.ParseTS(evd.EndTimestamp)
			fmt.Printf("\tEndTimestamp: %v\n", physical)
			if err := printDDLPayloadValues(event.eventHeader.TypeCode, r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil {
				return err
			}
		case CreatePartitionEventType:
			evd, ok := event.eventData.(*createPartitionEventData)
			if !ok {
				return errors.New("incorrect event data type")
			}
			fmt.Printf("event %d create partition event:\n", eventNum)
			physical, _ = tsoutil.ParseTS(evd.StartTimestamp)
			fmt.Printf("\tStartTimestamp: %v\n", physical)
			physical, _ = tsoutil.ParseTS(evd.EndTimestamp)
			fmt.Printf("\tEndTimestamp: %v\n", physical)
			if err := printDDLPayloadValues(event.eventHeader.TypeCode, r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil {
				return err
			}
		case DropPartitionEventType:
			evd, ok := event.eventData.(*dropPartitionEventData)
			if !ok {
				return errors.New("incorrect event data type")
			}
			fmt.Printf("event %d drop partition event:\n", eventNum)
			physical, _ = tsoutil.ParseTS(evd.StartTimestamp)
			fmt.Printf("\tStartTimestamp: %v\n", physical)
			physical, _ = tsoutil.ParseTS(evd.EndTimestamp)
			fmt.Printf("\tEndTimestamp: %v\n", physical)
			if err := printDDLPayloadValues(event.eventHeader.TypeCode, r.descriptorEvent.descriptorEventData.PayloadDataType, event.PayloadReaderInterface); err != nil {
				return err
			}
		default:
			return fmt.Errorf("undefined event typd %d", event.eventHeader.TypeCode)
		}
		eventNum++
	}

	return nil
}

func printPayloadValues(colType schemapb.DataType, reader PayloadReaderInterface) error {
	fmt.Println("\tpayload values:")
	switch colType {
	case schemapb.DataType_Bool:
		val, err := reader.GetBoolFromPayload()
		if err != nil {
			return err
		}
		for i, v := range val {
			fmt.Printf("\t\t%d : %v\n", i, v)
		}
	case schemapb.DataType_Int8:
		val, err := reader.GetInt8FromPayload()
		if err != nil {
			return err
		}
		for i, v := range val {
			fmt.Printf("\t\t%d : %d\n", i, v)
		}
	case schemapb.DataType_Int16:
		val, err := reader.GetInt16FromPayload()
		if err != nil {
			return err
		}
		for i, v := range val {
			fmt.Printf("\t\t%d : %d\n", i, v)
		}
	case schemapb.DataType_Int32:
		val, err := reader.GetInt32FromPayload()
		if err != nil {
			return err
		}
		for i, v := range val {
			fmt.Printf("\t\t%d : %d\n", i, v)
		}
	case schemapb.DataType_Int64:
		val, err := reader.GetInt64FromPayload()
		if err != nil {
			return err
		}
		for i, v := range val {
			fmt.Printf("\t\t%d : %d\n", i, v)
		}
	case schemapb.DataType_Float:
		val, err := reader.GetFloatFromPayload()
		if err != nil {
			return err
		}
		for i, v := range val {
			fmt.Printf("\t\t%d : %f\n", i, v)
		}
	case schemapb.DataType_Double:
		val, err := reader.GetDoubleFromPayload()
		if err != nil {
			return err
		}
		for i, v := range val {
			fmt.Printf("\t\t%d : %v\n", i, v)
		}
	case schemapb.DataType_String:
		rows, err := reader.GetPayloadLengthFromReader()
		if err != nil {
			return err
		}
		for i := 0; i < rows; i++ {
			val, err := reader.GetOneStringFromPayload(i)
			if err != nil {
				return err
			}
			fmt.Printf("\t\t%d : %s\n", i, val)
		}
	case schemapb.DataType_BinaryVector:
		val, dim, err := reader.GetBinaryVectorFromPayload()
		if err != nil {
			return err
		}
		dim = dim / 8
		length := len(val) / dim
		for i := 0; i < length; i++ {
			fmt.Printf("\t\t%d :", i)
			for j := 0; j < dim; j++ {
				idx := i*dim + j
				fmt.Printf(" %02x", val[idx])
			}
			fmt.Println()
		}
	case schemapb.DataType_FloatVector:
		val, dim, err := reader.GetFloatVectorFromPayload()
		if err != nil {
			return err
		}
		length := len(val) / dim
		for i := 0; i < length; i++ {
			fmt.Printf("\t\t%d :", i)
			for j := 0; j < dim; j++ {
				idx := i*dim + j
				fmt.Printf(" %f", val[idx])
			}
			fmt.Println()
		}
	default:
		return errors.New("undefined data type")
	}
	return nil
}

func printDDLPayloadValues(eventType EventTypeCode, colType schemapb.DataType, reader PayloadReaderInterface) error {
	fmt.Println("\tpayload values:")
	switch colType {
	case schemapb.DataType_Int64:
		val, err := reader.GetInt64FromPayload()
		if err != nil {
			return err
		}
		for i, v := range val {
			physical, logical := tsoutil.ParseTS(uint64(v))
			fmt.Printf("\t\t%d : physical : %v ; logical : %d\n", i, physical, logical)
		}
	case schemapb.DataType_String:
		rows, err := reader.GetPayloadLengthFromReader()
		if err != nil {
			return err
		}
		for i := 0; i < rows; i++ {
			val, err := reader.GetOneStringFromPayload(i)
			valBytes := []byte(val)
			if err != nil {
				return err
			}
			switch eventType {
			case CreateCollectionEventType:
				var req internalpb.CreateCollectionRequest
				if err := proto.Unmarshal(valBytes, &req); err != nil {
					return err
				}
				fmt.Printf("\t\t%d : create collection: %v\n", i, req)
			case DropCollectionEventType:
				var req internalpb.DropCollectionRequest
				if err := proto.Unmarshal(valBytes, &req); err != nil {
					return err
				}
				fmt.Printf("\t\t%d : drop collection: %v\n", i, req)
			case CreatePartitionEventType:
				var req internalpb.CreatePartitionRequest
				if err := proto.Unmarshal(valBytes, &req); err != nil {
					return err
				}
				fmt.Printf("\t\t%d : create partition: %v\n", i, req)
			case DropPartitionEventType:
				var req internalpb.DropPartitionRequest
				if err := proto.Unmarshal(valBytes, &req); err != nil {
					return err
				}
				fmt.Printf("\t\t%d : drop partition: %v\n", i, req)
			default:
				return fmt.Errorf("undefined ddl event type %d", eventType)
			}
		}
	default:
		return errors.New("undefined data type")
	}
	return nil
}
